package com.flink.examples.aggregate;

import com.flink.examples.DataSource;
import org.apache.flink.api.common.accumulators.AverageAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;

/**
 * @Description Aggregate算子：提供基于事件窗口进行增量计算的函数。（对输入窗口每个数据流元素递增聚合计算，并将窗口状态与窗口内元素保持在累加器中）
 * @Author JL
 * @Date 2020/10/08
 * @Version V1.0
 */
public class Aggregate {

    /**
     * 遍历集合，分别打印不同性别的总人数与平均值
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Tuple3<姓名，性别（man男，girl女），年龄>
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        DataStream<MyAverageAccumulator> dataStream = env.fromCollection(tuple3List)
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //按数量窗口滚动，每3个输入窗口数据流，计算一次
                .countWindow(3)
                //只能基于Windowed窗口Stream进行调用
                .aggregate(new AggregateFunction<Tuple3<String, String, Integer>, MyAverageAccumulator, MyAverageAccumulator>() {
                    /**
                     * 创建新累加器，开始聚合计算
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator createAccumulator() {
                        return new MyAverageAccumulator();
                    }

                    /**
                     * 将窗口输入的数据流值添加到窗口累加器，并返回新的累加器值
                     * @param tuple3
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator add(Tuple3<String, String, Integer> tuple3, MyAverageAccumulator accumulator) {
                        System.out.println("tuple3:" + tuple3.toString());
                        accumulator.setGender(tuple3.f1);
                        //此accumulator保含个数统计和值累计两个属性，add方法内会计算窗口内总数与求和
                        accumulator.add(tuple3.f2);
                        return accumulator;
                    }

                    /**
                     * 获取累加器聚合结果
                     * @param accumulator
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator getResult(MyAverageAccumulator accumulator) {
                        return accumulator;
                    }

                    /**
                     * 合并两个累加器，返回合并后的累加器的状态
                     * @param a
                     * @param b
                     * @return
                     */
                    @Override
                    public MyAverageAccumulator merge(MyAverageAccumulator a, MyAverageAccumulator b) {
                        a.merge(b);
                        return a;
                    }
                });
        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 添加性别属性（此类用于显示不同性别的平均值）
     */
    public static class MyAverageAccumulator extends AverageAccumulator{
        private String gender;
        public String getGender() {
            return gender;
        }
        public void setGender(String gender) {
            this.gender = gender;
        }
        @Override
        public String toString() {
            //继承父类的this.getLocalValue()方法用于计算并返回平均值
            return super.toString() + ", gender to " + gender;
        }
    }

}
